package main

import (
	"flag"
	"fmt"
	"net"
	"strings"
	"sync"
	"time"

	"github.com/nsqio/go-nsq"
)

const (
	topic = "online"
)

var (
	host     = flag.String("host", "", "listen host")
	nsqdAddr = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "nsqd address")
)

func init() {
	flag.Parse()
}

var (
	mgr      sync.Map
	producer *nsq.Producer
	consumer *nsq.Consumer
)

func serve(conn net.Conn) {
	mgr.Store(conn, nil)

	buffer := make([]byte, 1024)
	n, err := conn.Read(buffer[:])
	if err != nil {
		panic(err)
	}
	buffer = buffer[:n]
	fmt.Println("User: " + string(buffer) + " Online Notify Others")

	buffer = append(buffer, []byte(" Online At "+time.Now().String())...)
	fmt.Println("Publish:", string(buffer), "Length:", len(buffer))

	err = producer.Publish(topic, buffer)
	if err != nil {
		panic(err)
	}
}

type TopicHandler struct{}

func (this *TopicHandler) HandleMessage(m *nsq.Message) error {
	fmt.Println("Consumer Receive Message:", string(m.Body))

	mgr.Range(func(k, v interface{}) bool {
		conn := k.(net.Conn)
		_, err := conn.Write(m.Body)
		if err != nil {
			fmt.Println(err)
			mgr.Delete(k)
		}
		return true
	})
	return nil
}

func main() {
	l, err := net.Listen("tcp", *host)
	if err != nil {
		panic(err)
	}

	producer, err = nsq.NewProducer(*nsqdAddr, nsq.NewConfig())
	if err != nil {
		panic(err)
	}

	v := strings.Split(*host, ":")
	channel := v[1] + "#ephemeral"
	consumer, err = nsq.NewConsumer(topic, channel, nsq.NewConfig())
	if err != nil {
		panic(err)
	}
	consumer.AddHandler(&TopicHandler{})
	err = consumer.ConnectToNSQD(*nsqdAddr)
	if err != nil {
		panic(err)
	}

	for {
		conn, err := l.Accept()
		if err != nil {
			panic(err)
		}
		go serve(conn)
	}
}
